package com.ideal.hadoopadmin.crontab.hdfs;

import com.ideal.hadoopadmin.crontab.db.ConnectionManager;
import com.ideal.hadoopadmin.crontab.tool.Tools;
import com.ideal.tools.ssh.common.CommonProperties;
import com.ideal.tools.ssh.common.CommonTools;
import com.ideal.tools.ssh.common.OperationMarket;
import com.ideal.tools.ssh.context.ClusterContext;
import com.ideal.tools.ssh.entity.LinuxMachine;
import org.apache.commons.lang3.StringUtils;
import com.ideal.hadoopadmin.crontab.property.Properties;
import java.io.File;
import java.sql.Connection;
import java.text.SimpleDateFormat;
import java.util.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FlushHDFSInfo {
    public FlushHDFSInfo() {
        Properties.initMySQL();
    }
    private static Logger logger = LoggerFactory.getLogger(FlushHDFSInfo.class);

    /**
     * 1、生成hdfsMetaData20160331091318.txt
     * 2、读取hdfsMetaData20160331091318.txt(过滤private和public)
     * 3、读取meta_hdfs_info_bak
     * 4、以hdfsMetaData20160331091318.txt为基准保证和meta_hdfs_info_bak数据一致
     * @param context
     */
    public void HDFSDailyRefreshNew(ClusterContext context) {
        int count = 1;
        StringBuffer hql = new StringBuffer();
        List<String> sqlList = new ArrayList<String>();
        HashMap<String, String> hdfsfileMap = new HashMap<String, String>(), userCluseterMap = new HashMap<String, String>();
        String  init_path = context.getCommonProperties().getProperty(CommonProperties.WEBAPP_INIT_PATH,"")+ "/";
        String fileName = Properties.HDFS_INFO_FILENAME+ Tools.getCurrentDate() + ".txt";
        String hdfsMetaFile = init_path + fileName;
        //生成系统的配置信息 在指定的文件中
        createHdfsFile(context,hdfsMetaFile);
        //如果是测试环境下载文件
        hdfsMetaFile = DownloadOrNo(context, fileName, hdfsMetaFile);
        //读取txt 文本信息
        List<String> hdfsContent = getReadFile(hdfsMetaFile);
        if (hdfsContent.isEmpty()){
            logger.warn("###FlushHDFSinfo: Reader log file is empty !");
            return;
        }
        //hdfsContent转换成hdfsfileMap （转换 key 是path ） 过滤 private和 public
        getHdfsFileToMapSql(hdfsContent, count, hdfsfileMap, hql);
        //Query clusterUser对应的clusterUserId封装成userCluseterMap
        getHdfsUserCluseterMap(userCluseterMap, hql);
        //读取meta_hdfs_info_bak
        List<Map<String, Object>> resHdfsInfo = getHdfsDbInfo(hql);
        //处理file和Db的差异
        getHdfsCompareFileAndDb(sqlList, hql, hdfsfileMap, resHdfsInfo);
        //将hdfsfileMap插入到meta_hdfs_info_bak
        getHdsfInputSql(sqlList, hql, hdfsfileMap, userCluseterMap);
    }

    //如果是测试环境下载文件
    private String DownloadOrNo(ClusterContext context, String fileName, String hdfsMetaFile) {
        String init_path;
        if (Tools.DemoChangeDataBase == 1) {
            init_path = System.getProperty("user.dir")+System.getProperty("file.separator");
            List<LinuxMachine> machines=context.getOriginalList();
            List<LinuxMachine> finalMachines = CommonTools.getMachineListByType(machines,context,
                    LinuxMachine.MachineType.WebAPP);
            for(LinuxMachine machine :finalMachines){
                if(machine.getMachineType()== LinuxMachine.MachineType.WebAPP){
                    CommonTools.downLoadFileToLocal(machine.getSshAuthor(),hdfsMetaFile,init_path);
                }
            }
            hdfsMetaFile = init_path + fileName;
        }
        return hdfsMetaFile;
    }

    //读取文件
    private List<String> getReadFile(String hdfsMetaFile) {
        logger.debug("==begin readFile ....==");
        File file = new File(hdfsMetaFile);
        if(!file.exists()){
            logger.info("###FlushHDFSinfo: Reader log file error, please check path of log file!");
            return new ArrayList<String>();
        }
        logger.debug("111111111");
        List<String> hdfsContent = Tools.readFile2List(hdfsMetaFile);
        logger.debug("1111111112222222");
        //空文件不处理
        if (null == hdfsContent ||"\uFEFF".equals(hdfsContent.get(0).trim())){
            logger.info("###FlushHDFSinfo: Reader log file is empty !");
            return new ArrayList<String>();
        }
        logger.debug("===dddd=="+hdfsContent.size()+"****");
        if (Tools.DemoChangeDataBase == 1) file.delete();
        return hdfsContent;
    }

    //hdfsContent转换成hdfsfileMap （转换 key 是path ）
    private void getHdfsFileToMapSql(List<String> hdfsContent, int count, HashMap<String, String> hdfsfileMap, StringBuffer hql) {
        hql.append("SELECT DISTINCT id,userName from cluster_user where userName in (");
        getFileSqltoFileAndMap(hdfsContent, count, hdfsfileMap, hql);
    }

    /**
     * 更新文件缓存hdfsfileMap(整理成数据库对应字段，方便插入)
     * 过滤 private和 public
     * 更新:private -> 1   public-> 0
     * 记录clusterUser,为了批量转换成clusterUserId
     */
    private void getFileSqltoFileAndMap(List<String> hdfsContent, int count, HashMap<String, String> hdfsfileMap, StringBuffer hql) {
        String[] tem = null;
        for (String filePath : hdfsContent) {
            tem = filePath.split(" ");
            if ("private".equals(tem[1])) {
                //value对应的字段:hdfsPath clusterUser hdfsGroup hdfsOwner hdfsPerm createTime properties
                hdfsfileMap.put(tem[2], tem[2] + " " + tem[0] + " " + tem[6] + " " + tem[5] + " " + tem[3] + " " + tem[4] + " 1");
            }
            if ("public".equals(filePath.split(" ")[1])) {
                hdfsfileMap.put(tem[2], tem[2] + " " + tem[0] + " " + tem[6] + " " + tem[5] + " " + tem[3] + " " + tem[4] + " 0");
                //value对应的字段:hdfsPath clusterUser hdfsGroup hdfsOwner hdfsPerm createTime properties
            }
            hql.append(count == hdfsContent.size() ? "'" + tem[0] + "'" : "'" + tem[0] + "',");
            tem = null; count++;
        }
        hql.append(")");
    }

    //Query clusterUser对应的clusterUserId封装成userCluseterMap
    private void getHdfsUserCluseterMap(HashMap<String, String> userCluseterMap, StringBuffer hql) {
        Connection conClusterUser = ConnectionManager.getConnection();
        List<Map<String, Object>> resClusterUser = ConnectionManager.queryDB(conClusterUser, hql.toString());
        if(resClusterUser.isEmpty()){
            logger.info("###FlushHDFSinfo: cluster_user table is null!");
            return;
        }
        for (Map<String, Object> user : resClusterUser) {
            userCluseterMap.put(user.get("USERNAME").toString(), user.get("ID").toString());
        }
    }

    //读取hadoopadmin_mybatiesd的meta_hdfs_info_bak
    private List<Map<String, Object>> getHdfsDbInfo(StringBuffer hql) {
        hql.setLength(0);
        hql.append("SELECT id,hdfsPath,clusterUserId,hdfsGroup,hdfsOwner,hdfsPerm from meta_hdfs_info_bak");
        Connection conHdfsInfo = ConnectionManager.getConnection();
        return ConnectionManager.queryDB(conHdfsInfo, hql.toString());
    }

    /**
     * 处理file和Db的差异：
     * 如果db记录存在file中则删除file缓存文件对应记录
     * 如果db记录不存在file中则记录Id,然后批量删除
     */
    private void getHdfsCompareFileAndDb(List<String> sqlList, StringBuffer hql, HashMap<String, String> hdfsfileMap, List<Map<String, Object>> resHdfsInfo) {
        int count;
        if (!resHdfsInfo.isEmpty()) {
            count = 1;
            hql.setLength(0);
            //对比db和file缓存文件
            getCompareFileAndMap(hql, hdfsfileMap, resHdfsInfo);
            //删除meta_hdfs_info_bak里之前记录的id
            getDelDbErrorFile(sqlList, hql, count);
        }
    }

    //删除meta_hdfs_info_bak里之前记录的id
    private void getDelDbErrorFile(List<String> sqlList, StringBuffer hql, int count) {
        String[] tem = hql.toString().split(" ");
        if (hql.length() > 0) {
            hql.setLength(0);
            for (String sql : tem) {
                hql.append(count != tem.length ? "'" + sql + "'," : "'" + sql + "'");
                count++;
            }
            sqlList.add("delete from meta_hdfs_info_bak where id in (" + hql.toString() + ")");
            Tools.exeSQLBatch(sqlList);
        }
    }

    //对比db数据和file里的数据
    private void getCompareFileAndMap(StringBuffer hql, HashMap<String, String> hdfsfileMap, List<Map<String, Object>> resHdfsInfo) {
        for (int i = resHdfsInfo.size() - 1; i > -1; i--) {
            if (hdfsfileMap.get(resHdfsInfo.get(i).get("HDFSPATH")) != null) {
                //db数据在file里有，删hdfsfileMap数据
                hdfsfileMap.remove(resHdfsInfo.get(i).get("HDFSPATH"));
            } else {
                //db数据在file里没有,记录Id，删除数据
                hql.append(resHdfsInfo.get(i).get("ID") + " ");
            }
        }
    }

    // 将hdfsfileMap插入到meta_hdfs_info_bak
    private void getHdsfInputSql(List<String> sqlList, StringBuffer hql, HashMap<String, String> hdfsfileMap, HashMap<String, String> userCluseterMap) {
        sqlList.clear();
        Iterator<Map.Entry<String, String>> iterator = hdfsfileMap.entrySet().iterator();
        while (iterator.hasNext()) {
            hql.setLength(0);
            Map.Entry<String, String> entry = iterator.next();
            String[] tem = entry.getValue().split(" ");
            if (null == userCluseterMap.get(tem[1])) continue;
            Tools.getCreateSqlParam(hql, " insert into meta_hdfs_info_bak (hdfsPath,clusterUserId"
                    , " ,hdfsGroup,hdfsOwner,hdfsPerm,note,createTime,properties) values ("
                    , " '" + tem[0] + "'," + "'" + userCluseterMap.get(tem[1]) + "'," + "'" + tem[2]
                    , " ','" + tem[3] + "', '" + tem[4] + "','','"
                    , System.currentTimeMillis() + "','" + tem[6] + " ')");
            sqlList.add(hql.toString());
            tem = null;
        }
        if(sqlList.isEmpty()){
            logger.info("###FlushHDFSinfo: user of log file not change with cluster_user table, please cheack!");
        }
        Tools.exeSQLBatch(sqlList);
    }

    //在远程机器上执行生成文件命令
    public static void createHdfsFile(ClusterContext context, String hdfsMetaFile) {
        String cmd = "sudo -u hdfs hadoop fs -ls /user/*/*/|" +
                "awk '{arrlen=split($8,arr,\"/\");if(arrlen>=5 && substr($1,1,1)==\"d\") print arr[3] \" \" arr[4] \" \" $8 \" \" $1\" \"$6\":\"$7\" \"$3\" \"$4}'" +
                " > " + hdfsMetaFile;
        List<LinuxMachine> machineList = context.getOriginalList();
        List<LinuxMachine> finalMachines = CommonTools.getMachineListByType(machineList,
                context,LinuxMachine.MachineType.WebAPP);
        //webapp 只会有一台 这里只需要 得到第一个就可以了
        finalMachines.get(0).initOperation(OperationMarket.ExeOneShellCMD(cmd));
        //重新设置 机器列表
        context.setMachineList(finalMachines);
        //执行
        context.doTheThing();
        //打印结果
        context.printResult();
    }

    public void call(){
        //构造测试ClusterContext  测试用
        HDFSDailyRefreshNew(Tools.getDemoClusterContextDemo());
    }

    public void callHDFSAccess(){
        Map<String, String> propertyMap = new HashMap<String, String>();
        propertyMap.put(Properties.Linux_Group_Path, Properties.instance().getPropertyByKey(Properties.Linux_Group_Path, ""));
        CommonProperties commonProperties = new CommonProperties(propertyMap);
        ClusterContext context = new ClusterContext(commonProperties);
        initHDFSAccess(context);
    }

    //初始化hdfs 可访问用户
    public void  initHDFSAccess(ClusterContext context){
        String path = "";
        if (Tools.DemoChangeDataBase == 1) {
            path = System.getProperty("user.dir") + System.getProperty("file.separator") +"DemoFile"+System.getProperty("file.separator")+"group";  //测试
        } else {
            path=  context.getCommonProperties().getProperty(Properties.Linux_Group_Path,""); //正式
        }
        //读取/etc/group文件信息
        Map<String,String> groupMap=readGroupInfo(path);
        //生成 insert sql
        List<String> insertSQL=makeHDFSVistorInsertSQL(groupMap);
        //入库前先删除 数据库信息
        String delSQLStr="truncate table meta_hdfs_access";
        List<String> delSQLList=new ArrayList<String>();
        delSQLList.add(delSQLStr);
        Tools.exeSQLBatch(delSQLList);
        //出入数据库
        Tools.exeSQLBatch(insertSQL);
    }

    //创建hdfs 插入sql
    public List<String> makeHDFSVistorInsertSQL(Map<String,String> groupMap){
        List<String> sqlList = new ArrayList<String>();
        //查询出基础信息
        String sql= "SELECT clusterUserId,hdfsPath,hdfsGroup from meta_hdfs_info";
        Connection conn=ConnectionManager.getConnection();
        //获取用户信息
        Map<String,String> userMap=getUserInfo();
        //获取meta_hdfs_info信息
        Map<String,String> hdfsInfoMap=getHdfsInfo();
        List<Map<String,Object>> rsList=ConnectionManager.queryDB(conn, sql);
        //获取当前时间
        SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        String curDate= String.valueOf(System.currentTimeMillis());
        String group = "", hdfsInfoId = "", visiters = "", userid = "", insertsql = "";
        for(Map<String,Object> hdfsinfo:rsList){
            group=hdfsinfo.get("HDFSGROUP").toString();
            hdfsInfoId="";
            visiters=groupMap.get(group);
            if(visiters==null)visiters="";
            for(String singleVisitor:visiters.split(",")) {
                userid=userMap.get(singleVisitor);
                if(userid==null) {
                    userid = singleVisitor;
                    continue;
                }
//                hdfsInfoId = hdfsInfoMap.get(hdfsinfo.get("HDFSPATH"));
                hdfsInfoId = hdfsInfoMap.get(hdfsinfo.get("HDFSPATH").toString()+userid);
                if (StringUtils.isBlank(hdfsInfoId)) continue;
                insertsql = "insert into meta_hdfs_access(clusterUserId,hdfsInfoId,createTime) values" +
                        "('" + userid + "','" + hdfsInfoId  + "','"+curDate+"')";
                sqlList.add(insertsql);
            }
        }
        return sqlList;
    }

    //读取 /etc/group 信息
    public Map<String,String> readGroupInfo(String path){
        Map<String,String> groupMap=new HashMap<String, String>();
        List<String> groupList=Tools.readFile2List(path);
        String[] tmp = null;
        for(String groupStr:groupList){
            tmp=groupStr.split(":");
            if(tmp.length>=4)
                groupMap.put(tmp[0],tmp[3]);
            else
                groupMap.put(tmp[0],"");
        }
        return groupMap;
    }

    //查询cluster_user
    public Map<String,String> getUserInfo(){
        StringBuffer sql = new StringBuffer();
        Map<String,String> userMap = new HashMap<String, String>();
        sql.append("SELECT id,userName from cluster_user");
        Connection conClusterUser = ConnectionManager.getConnection();
        List<Map<String, Object>> resUser = ConnectionManager.queryDB(conClusterUser, sql.toString());
        for(Map<String, Object> user : resUser){
            userMap.put(user.get("USERNAME").toString(),user.get("ID").toString());
        }
        return  userMap;
    }
    //查询meta_hdfs_info
    public Map<String,String> getHdfsInfo(){
        StringBuffer sql = new StringBuffer();
        Map<String,String> userMap = new HashMap<String, String>();
        sql.append("SELECT id,hdfsPath,clusterUserId from meta_hdfs_info");
        Connection conHdfs = ConnectionManager.getConnection();
        List<Map<String, Object>> resHdfs = ConnectionManager.queryDB(conHdfs, sql.toString());
        for(Map<String, Object> hdfs : resHdfs){
            userMap.put(hdfs.get("HDFSPATH").toString()+hdfs.get("CLUSTERUSERID"),hdfs.get("ID").toString());
        }
        return  userMap;
    }


    public static void main(String s[]) {
        FlushHDFSInfo fo = new FlushHDFSInfo();
        //Demo 刷新hdfs
        fo.HDFSDailyRefreshNew(Tools.getDemoClusterContextDemo());
        //初始化hdfs 可访问用户
//        fo.initHDFSAccess(Tools.getDemoClusterContextDemo());
    }
}
